09 Operations and Pipelines

When interactively exploring a dataset you often end up interleaving visualization and analysis code. In HoloViews your visualization and your data are one and the same, so analysis and data transformations can be applied directly to the visualizable data. For that purpose HoloViews provides operations, which can be used to implement any analysis or data transformation you might want to do. Operations take a HoloViews Element and return another Element of either the same type or a new type, depending on the operation. We'll illustrate operations and pipelines using a variety of libraries:

Since Operations know about HoloViews you can apply them to large collections of data collected in HoloMap and DynamicMap containers. Since operations work on both of these containers that means they can also be applied lazily. This feature allows us to chain multiple operations in a data analysis, processing, and visualization pipeline, e.g. to drive the operation of a dashboard.

Pipelines built using DynamicMap and HoloViews operations are also useful for caching intermediate results and just-in-time computations, because they lazily (re)compute just the part of the pipeline that has changed.

In [1]:
import time
import param
import numpy as np
import pandas as pd
import holoviews as hv
import datashader as ds

from bokeh.sampledata import stocks
from holoviews.operation import decimate
from holoviews.operation.timeseries import rolling, rolling_outlier_std
from holoviews.operation.datashader import datashade, dynspread, aggregate

hv.extension('bokeh')

Declare some data

In this example we'll work with a timeseries that stands in for stock-price data. We'll define a small function to generate a random, noisy timeseries, then define a DynamicMap that will generate a timeseries for each stock symbol:

In [2]:
def time_series(T=1, N=100, mu=0.1, sigma=0.1, S0=20):  
    """Parameterized noisy time series"""
    dt = float(T)/N
    t = np.linspace(0, T, N)
    W = np.random.standard_normal(size = N) 
    W = np.cumsum(W)*np.sqrt(dt) # standard brownian motion
    X = (mu-0.5*sigma**2)*t + sigma*W 
    S = S0*np.exp(X) # geometric brownian motion
    return S

def load_symbol(symbol, **kwargs):
    return hv.Curve(time_series(N=10000), kdims=[('time', 'Time')],
                    vdims=[('adj_close', 'Adjusted Close')])

stock_symbols = ['AAPL', 'FB', 'IBM', 'GOOG', 'MSFT']
dmap = hv.DynamicMap(load_symbol, kdims=['Symbol']).redim.values(Symbol=stock_symbols)

We will start by visualizing this data as-is:

In [3]:
%opts Curve [width=600] {+framewise}
dmap
Out[3]:

Applying an operation

Now let's start applying some operations to this data. HoloViews ships with two ready-to-use timeseries operations: the rolling operation, which applies a function over a rolling window, and a rolling_outlier_std operation that computes outlier points in a timeseries. Specifically, rolling_outlier_std excludes points less than one sigma (standard deviation) away from the rolling mean, which is just one example; you can trivially write your own operations that do whatever you like.

In [4]:
%opts Scatter (color='indianred')
smoothed = rolling(dmap, rolling_window=30)
outliers = rolling_outlier_std(dmap, rolling_window=30)
smoothed * outliers
Out[4]:

As you can see, the operations transform the Curve element into a smoothed version and a set of Scatter points containing the outliers both with a rolling_window of 30. Since we applied the operation to a DynamicMap , the operation is lazy and only computes the result when it is requested.

In [5]:
# Exercise: Apply the rolling and rolling_outlier_std operations changing the rolling_window and sigma parameters

Linking operations to streams

Instead of supplying the parameter values for each operation explicitly as a scalar value, we can also define a Stream that will let us update our visualization dynamically. By supplying a Stream with a rolling_window parameter to both operations, we can now generate our own events on the stream and watch our visualization update each time.

In [6]:
rolling_stream = hv.streams.Stream.define('rolling', rolling_window=5)
stream = rolling_stream()

rolled_dmap = rolling(dmap, streams=[stream])
outlier_dmap = rolling_outlier_std(dmap, streams=[stream])
rolled_dmap * outlier_dmap
Out[6]:
In [7]:
for i in range(20, 200, 20):
    time.sleep(0.2)
    stream.event(rolling_window=i)
In [8]:
# Exercise: Create a stream to control the sigma value and add it to the outlier operation,
#           then vary the sigma value and observe the effect

Defining operations

Defining custom Operations is also very straightforward. For instance, let's define an Operation to compute the residual between two overlaid Curve Elements. All we need to do is subclass from the Operation baseclass and define a _process method, which takes the Element or Overlay as input and returns a new Element . The residual operation can then be used to subtract the y-values of the second Curve from those of the first Curve.

In [9]:
import param
from holoviews.operation import Operation

class residual(Operation):
    """
    Subtracts two curves from one another.
    """
    
    label = param.String(default='Residual', doc="""
        Defines the label of the returned Element.""")
    
    def _process(self, element, key=None):
        # Get first and second Element in overlay
        el1, el2 = element.get(0), element.get(1)
        
        # Get x-values and y-values of curves
        xvals  = el1.dimension_values(0)
        yvals1 = el1.dimension_values(1)
        yvals2 = el2.dimension_values(1)
        
        # Return new Element with subtracted y-values
        # and new label
        return el1.clone((xvals, yvals1-yvals2),
                         vdims=[self.p.label])

To see what that looks like in action let's try it out by comparing the smoothed and original Curve.

In [10]:
residual_dmap = residual(rolled_dmap * dmap)
residual_dmap
Out[10]:

Since the stream we created is linked to one of the inputs of residual_dmap , changing the stream values triggers updates both in the plot above and in our new residual plot.

In [11]:
for i in range(20, 200, 20):
    time.sleep(0.2)
    stream.event(rolling_window=i)

Chaining operations

Of course, since operations simply transform an Element in some way, operations can easily be chained. As a simple example, we will take the rolled_dmap and apply the datashading and dynspread operation to it to construct a datashaded version of the plot. As you'll be able to see, this concise specification defines a complex analysis pipeline that gets reapplied whenever you change the Symbol or interact with the plot -- whenever the data needs to be updated.

In [12]:
%%opts RGB [width=600 height=400] {+framewise}
overlay = dynspread(datashade(rolled_dmap)) * outlier_dmap
(overlay + residual_dmap).cols(1)
Out[12]:

Visualizing the pipeline

To understand what is going on we will write a small utility that traverses the output we just displayed above and visualizes each processing step leading up to it.

In [13]:
%%opts RGB Curve [width=250 height=200]

def traverse(obj, key, items=None):
    items = [] if items is None else items
    for inp in obj.callback.inputs[:1]:
        label = inp.callback.operation.name if isinstance(inp.callback, hv.core.OperationCallable) else 'price'
        if inp.last: items.append(inp[key].relabel(label))
        if isinstance(inp, hv.DynamicMap): traverse(inp, key, items)
    return list(hv.core.util.unique_iterator(items))[:-1]

hv.Layout(traverse(overlay, 'AAPL')).cols(4)
Out[13]:

Reading from right to left, the original price timeseries is first smoothed with a rolling window, then datashaded, then each pixel is spread to cover a larger area. As you can see, arbitrarily many standard or custom operations can be defined to capture even very complex workflows so that they can be replayed dynamically as needed interactively.


Right click to download this notebook from GitHub.